/*
 * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
 */

package io.vertx.core.impl;

import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.TimeoutStream;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.datagram.impl.DatagramSocketImpl;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
import io.vertx.core.dns.impl.DnsClientImpl;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.EventBusInternal;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.impl.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.resolver.DnsResolverProvider;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.TCPServerBase;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.shareddata.impl.SharedDataImpl;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.VertxTracer;

/**
 * 改动的很少，创建WorkerContext的时候允许创建并发执行的类
 * 
 * @author <a href="http://tfox.org">Tim Fox</a>
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
public class VertxImpl implements VertxInternal, MetricsProvider {

	private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);

	private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
	private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
	private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);

	static {
		// Disable Netty's resource leak detection to reduce the performance overhead if
		// not set by user
		// Supports both the default netty leak detection system property and the
		// deprecated one
		if (System.getProperty("io.netty.leakDetection.level") != null
				|| System.getProperty("io.netty.leakDetectionLevel") != null) {
			ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
		}

		// Use the JDK deflater/inflater by default
		System.setProperty("io.netty.noJdkZlibDecoder", "false");
	}

	private final FileSystem fileSystem = getFileSystem();
	private final SharedData sharedData;
	private final VertxMetrics metrics;
	private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>();
	private final AtomicLong timeoutCounter = new AtomicLong(0);
	private final ClusterManager clusterManager;
	private final NodeSelector nodeSelector;
	private final DeploymentManager deploymentManager;
	private final VerticleManager verticleManager;
	private final FileResolver fileResolver;
	private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
	private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
	final WorkerPool workerPool;
	final WorkerPool internalBlockingPool;
	private final ThreadFactory eventLoopThreadFactory;
	private final EventLoopGroup eventLoopGroup;
	private final EventLoopGroup acceptorEventLoopGroup;
	private final BlockedThreadChecker checker;
	private final AddressResolver addressResolver;
	private final AddressResolverOptions addressResolverOptions;
	private final EventBusInternal eventBus;
	private volatile HAManager haManager;
	private boolean closed;
	private volatile Handler<Throwable> exceptionHandler;
	private final Map<String, SharedWorkerPool> namedWorkerPools;
	private final int defaultWorkerPoolSize;
	private final long maxWorkerExecTime;
	private final TimeUnit maxWorkerExecTimeUnit;
	private final long maxEventLoopExecTime;
	private final TimeUnit maxEventLoopExecTimeUnit;
	private final CloseHooks closeHooks;
	private final Transport transport;
	private final VertxTracer tracer;
	private final ThreadLocal<WeakReference<AbstractContext>> stickyContext = new ThreadLocal<>();

	VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics,
			VertxTracer<?, ?> tracer, Transport transport, FileResolver fileResolver) {
		// Sanity check
		if (Vertx.currentContext() != null) {
			log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
		}
		closeHooks = new CloseHooks(log);
		maxEventLoopExecTime = options.getMaxEventLoopExecuteTime();
		maxEventLoopExecTimeUnit = options.getMaxEventLoopExecuteTimeUnit();
		checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(),
				options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(),
				options.getWarningExceptionTimeUnit());
		eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false,
				maxEventLoopExecTime, maxEventLoopExecTimeUnit);
		eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(),
				eventLoopThreadFactory, NETTY_IO_RATIO);
		ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false,
				options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
		// The acceptor event loop thread needs to be from a different pool otherwise
		// can get lags in accepted connections
		// under a lot of load
		acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1,
				acceptorEventLoopThreadFactory, 100);

		int workerPoolSize = options.getWorkerPoolSize();
		ExecutorService workerExec = new ThreadPoolExecutor(workerPoolSize, workerPoolSize, 0L, TimeUnit.MILLISECONDS,
				new LinkedTransferQueue<>(), new VertxThreadFactory("vert.x-worker-thread-", checker, true,
						options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
		PoolMetrics workerPoolMetrics = metrics != null
				? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize())
				: null;
		ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
				new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(),
						options.getMaxWorkerExecuteTimeUnit()));
		PoolMetrics internalBlockingPoolMetrics = metrics != null
				? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize())
				: null;
		internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
		namedWorkerPools = new HashMap<>();
		workerPool = new WorkerPool(workerExec, workerPoolMetrics);
		defaultWorkerPoolSize = options.getWorkerPoolSize();
		maxWorkerExecTime = options.getMaxWorkerExecuteTime();
		maxWorkerExecTimeUnit = options.getMaxWorkerExecuteTimeUnit();

		this.metrics = metrics;
		this.transport = transport;
		this.fileResolver = fileResolver;
		this.addressResolverOptions = options.getAddressResolverOptions();
		this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
		this.tracer = tracer;
		this.clusterManager = clusterManager;
		this.nodeSelector = nodeSelector;
		this.eventBus = clusterManager != null ? new ClusteredEventBus(this, options, clusterManager, nodeSelector)
				: new EventBusImpl(this);
		this.sharedData = new SharedDataImpl(this, clusterManager);
		this.deploymentManager = new DeploymentManager(this);
		this.verticleManager = new VerticleManager(this, deploymentManager);
	}

	void init() {
		eventBus.start(Promise.promise());
		if (metrics != null) {
			metrics.vertxCreated(this);
		}
	}

	void initClustered(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
		nodeSelector.init(this, clusterManager);
		clusterManager.init(this, nodeSelector);
		Promise<Void> initPromise = getOrCreateContext().promise();
		initPromise.future().onComplete(ar -> {
			if (ar.succeeded()) {
				if (metrics != null) {
					metrics.vertxCreated(this);
				}
				resultHandler.handle(Future.succeededFuture(this));
			} else {
				log.error("Failed to initialize clustered Vert.x", ar.cause());
				close().onComplete(ignore -> resultHandler.handle(Future.failedFuture(ar.cause())));
			}
		});
		Promise<Void> joinPromise = Promise.promise();
		joinPromise.future().onComplete(ar -> {
			if (ar.succeeded()) {
				createHaManager(options, initPromise);
			} else {
				initPromise.fail(ar.cause());
			}
		});
		clusterManager.join(joinPromise);
	}

	private void createHaManager(VertxOptions options, Promise<Void> initPromise) {
		this.<HAManager>executeBlocking(fut -> {
			Map<String, String> syncMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
			HAManager haManager = new HAManager(this, deploymentManager, verticleManager, clusterManager, syncMap,
					options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
			fut.complete(haManager);
		}, false, ar -> {
			if (ar.succeeded()) {
				haManager = ar.result();
				startEventBus(initPromise);
			} else {
				initPromise.fail(ar.cause());
			}
		});
	}

	private void startEventBus(Promise<Void> initPromise) {
		Promise<Void> promise = Promise.promise();
		eventBus.start(promise);
		promise.future().onComplete(ar -> {
			if (ar.succeeded()) {
				initializeHaManager(initPromise);
			} else {
				initPromise.fail(ar.cause());
			}
		});
	}

	private void initializeHaManager(Promise<Void> initPromise) {
		this.executeBlocking(fut -> {
			// Init the manager (i.e register listener and check the quorum)
			// after the event bus has been fully started and updated its state
			// it will have also set the clustered changed view handler on the ha manager
			haManager.init();
			fut.complete();
		}, false, initPromise);
	}

	/**
	 * @return The FileSystem implementation for the OS
	 */
	protected FileSystem getFileSystem() {
		return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this);
	}

	@Override
	public long maxEventLoopExecTime() {
		return maxEventLoopExecTime;
	}

	@Override
	public TimeUnit maxEventLoopExecTimeUnit() {
		return maxEventLoopExecTimeUnit;
	}

	@Override
	public DatagramSocket createDatagramSocket(DatagramSocketOptions options) {
		return DatagramSocketImpl.create(this, options);
	}

	@Override
	public DatagramSocket createDatagramSocket() {
		return createDatagramSocket(new DatagramSocketOptions());
	}

	public NetServer createNetServer(NetServerOptions options) {
		return new NetServerImpl(this, options);
	}

	@Override
	public NetServer createNetServer() {
		return createNetServer(new NetServerOptions());
	}

	@Override
	public NetClient createNetClient(NetClientOptions options, CloseFuture closeFuture) {
		NetClientImpl client = new NetClientImpl(this, options, closeFuture);
		closeFuture.init(client);
		return client;
	}

	public NetClient createNetClient(NetClientOptions options) {
		CloseFuture closeFuture = new CloseFuture();
		NetClient client = createNetClient(options, closeFuture);
		CloseHooks hooks = resolveHooks();
		hooks.add(closeFuture);
		return client;
	}

	@Override
	public NetClient createNetClient() {
		return createNetClient(new NetClientOptions());
	}

	@Override
	public Transport transport() {
		return transport;
	}

	@Override
	public boolean isNativeTransportEnabled() {
		return transport != Transport.JDK;
	}

	public FileSystem fileSystem() {
		return fileSystem;
	}

	public SharedData sharedData() {
		return sharedData;
	}

	public HttpServer createHttpServer(HttpServerOptions serverOptions) {
		return new HttpServerImpl(this, serverOptions);
	}

	@Override
	public HttpServer createHttpServer() {
		return createHttpServer(new HttpServerOptions());
	}

	@Override
	public HttpClient createHttpClient(HttpClientOptions options, CloseFuture closeFuture) {
		HttpClientImpl client = new HttpClientImpl(this, options, closeFuture);
		closeFuture.init(client);
		return client;
	}

	public HttpClient createHttpClient(HttpClientOptions options) {
		CloseFuture closeFuture = new CloseFuture();
		HttpClient client = createHttpClient(options, closeFuture);
		CloseHooks hooks = resolveHooks();
		hooks.add(closeFuture);
		return client;
	}

	@Override
	public HttpClient createHttpClient() {
		return createHttpClient(new HttpClientOptions());
	}

	public EventBus eventBus() {
		return eventBus;
	}

	public long setPeriodic(long delay, Handler<Long> handler) {
		return scheduleTimeout(getOrCreateContext(), handler, delay, true);
	}

	@Override
	public TimeoutStream periodicStream(long delay) {
		return new TimeoutStreamImpl(delay, true);
	}

	public long setTimer(long delay, Handler<Long> handler) {
		return scheduleTimeout(getOrCreateContext(), handler, delay, false);
	}

	@Override
	public TimeoutStream timerStream(long delay) {
		return new TimeoutStreamImpl(delay, false);
	}

	@Override
	public <T> PromiseInternal<T> promise() {
		ContextInternal context = getOrCreateContext();
		return context.promise();
	}

	@Override
	public <T> PromiseInternal<T> promise(Handler<AsyncResult<T>> handler) {
		if (handler instanceof PromiseInternal) {
			return (PromiseInternal<T>) handler;
		} else {
			PromiseInternal<T> promise = promise();
			promise.future().onComplete(handler);
			return promise;
		}
	}

	public void runOnContext(Handler<Void> task) {
		ContextInternal context = getOrCreateContext();
		context.runOnContext(task);
	}

	// The background pool is used for making blocking calls to legacy synchronous
	// APIs
	public ExecutorService getWorkerPool() {
		return workerPool.executor();
	}

	public EventLoopGroup getEventLoopGroup() {
		return eventLoopGroup;
	}

	public EventLoopGroup getAcceptorEventLoopGroup() {
		return acceptorEventLoopGroup;
	}

	public ContextInternal getOrCreateContext() {
		AbstractContext ctx = getContext();
		if (ctx == null) {
			// We are running embedded - Create a context
			ctx = createEventLoopContext();
			stickyContext.set(new WeakReference<>(ctx));
		}
		return ctx;
	}

	public Map<ServerID, HttpServerImpl> sharedHttpServers() {
		return sharedHttpServers;
	}

	public Map<ServerID, NetServerImpl> sharedNetServers() {
		return sharedNetServers;
	}

	@Override
	public <S extends TCPServerBase> Map<ServerID, S> sharedTCPServers(Class<S> type) {
		if (type == NetServerImpl.class) {
			return (Map<ServerID, S>) sharedNetServers;
		} else if (type == HttpServerImpl.class) {
			return (Map<ServerID, S>) sharedHttpServers;
		} else {
			throw new IllegalStateException();
		}
	}

	@Override
	public boolean isMetricsEnabled() {
		return metrics != null;
	}

	@Override
	public Metrics getMetrics() {
		return metrics;
	}

	public boolean cancelTimer(long id) {
		InternalTimerHandler handler = timeouts.remove(id);
		if (handler != null) {
			handler.cancel();
			return true;
		} else {
			return false;
		}
	}

	@Override
	public EventLoopContext createEventLoopContext(Deployment deployment, CloseHooks closeHooks, WorkerPool workerPool,
			ClassLoader tccl) {
		return new EventLoopContext(this, tracer, eventLoopGroup.next(), internalBlockingPool,
				workerPool != null ? workerPool : this.workerPool, deployment, closeHooks, tccl);
	}

	@Override
	public EventLoopContext createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) {
		return new EventLoopContext(this, tracer, eventLoop, internalBlockingPool,
				workerPool != null ? workerPool : this.workerPool, null, null, tccl);
	}

	@Override
	public EventLoopContext createEventLoopContext() {
		return createEventLoopContext(null, null, null, Thread.currentThread().getContextClassLoader());
	}

	@Override
	public WorkerContext createWorkerContext(Deployment deployment, CloseHooks closeHooks, WorkerPool workerPool,
			ClassLoader tccl) {
		if (deployment != null && deployment.deploymentOptions().isMultiThreaded()) {
			return new MultiThreadedWorkerContext(this, tracer, internalBlockingPool,
					workerPool != null ? workerPool : this.workerPool, deployment, closeHooks, tccl);
		}
		return new WorkerContext(this, tracer, internalBlockingPool, workerPool != null ? workerPool : this.workerPool,
				deployment, closeHooks, tccl);
	}

	@Override
	public WorkerContext createWorkerContext() {
		return createWorkerContext(null, null, null, null);
	}

	@Override
	public DnsClient createDnsClient(int port, String host) {
		return createDnsClient(new DnsClientOptions().setHost(host).setPort(port));
	}

	@Override
	public DnsClient createDnsClient() {
		return createDnsClient(new DnsClientOptions());
	}

	@Override
	public DnsClient createDnsClient(DnsClientOptions options) {
		String host = options.getHost();
		int port = options.getPort();
		if (host == null || port < 0) {
			DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions);
			InetSocketAddress address = provider.nameServerAddresses().get(0);
			// provide the host and port
			options = new DnsClientOptions(options).setHost(address.getAddress().getHostAddress())
					.setPort(address.getPort());
		}
		return new DnsClientImpl(this, options);
	}

	public long scheduleTimeout(ContextInternal context, Handler<Long> handler, long delay, boolean periodic) {
		if (delay < 1) {
			throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
		}
		long timerId = timeoutCounter.getAndIncrement();
		InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
		timeouts.put(timerId, task);
		if (context.isDeployment()) {
			context.addCloseHook(task);
		}
		return timerId;
	}

	public AbstractContext getContext() {
		AbstractContext context = (AbstractContext) ContextInternal.current();
		if (context != null && context.owner() == this) {
			return context;
		} else {
			WeakReference<AbstractContext> ref = stickyContext.get();
			return ref != null ? ref.get() : null;
		}
	}

	public ClusterManager getClusterManager() {
		return clusterManager;
	}

	@Override
	public Future<Void> close() {
		// Create this promise purposely without a context because the close operation
		// will close thread pools
		Promise<Void> promise = Promise.promise();
		close(promise);
		return promise.future();
	}

	private void closeClusterManager(Handler<AsyncResult<Void>> completionHandler) {
		Promise<Void> leavePromise = getOrCreateContext().promise();
		if (clusterManager != null) {
			clusterManager.leave(leavePromise);
		} else {
			leavePromise.complete();
		}
		leavePromise.future().onComplete(ar -> {
			if (ar.failed()) {
				log.error("Failed to leave cluster", ar.cause());
			}
			if (completionHandler != null) {
				completionHandler.handle(Future.succeededFuture());
			}
		});
	}

	@Override
	public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
		if (closed || eventBus == null) {
			// Just call the handler directly since pools shutdown
			if (completionHandler != null) {
				completionHandler.handle(Future.succeededFuture());
			}
			return;
		}
		closed = true;
		closeHooks.run(ar -> {
			deploymentManager.undeployAll().onComplete(ar1 -> {
				HAManager haManager = haManager();
				Promise<Void> haPromise = Promise.promise();
				if (haManager != null) {
					this.executeBlocking(fut -> {
						haManager.stop();
						fut.complete();
					}, false, haPromise);
				} else {
					haPromise.complete();
				}
				haPromise.future().onComplete(ar2 -> {
					addressResolver.close(ar3 -> {
						Promise<Void> ebClose = getOrCreateContext().promise();
						eventBus.close(ebClose);
						ebClose.future().onComplete(ar4 -> {
							closeClusterManager(ar5 -> {
								// Copy set to prevent ConcurrentModificationException
								deleteCacheDirAndShutdown(completionHandler);
							});
						});
					});
				});
			});
		});
	}

	@Override
	public Future<String> deployVerticle(String name) {
		return deployVerticle(name, new DeploymentOptions());
	}

	@Override
	public void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler) {
		deployVerticle(name, new DeploymentOptions(), completionHandler);
	}

	@Override
	public Future<String> deployVerticle(String name, DeploymentOptions options) {
		if (options.isHa() && haManager() != null && haManager().isEnabled()) {
			Promise<String> promise = getOrCreateContext().promise();
			haManager().deployVerticle(name, options, promise);
			return promise.future();
		} else {
			return verticleManager.deployVerticle(name, options).map(Deployment::deploymentID);
		}
	}

	@Override
	public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
		Future<String> fut = deployVerticle(name, options);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	@Override
	public Future<String> deployVerticle(Verticle verticle) {
		return deployVerticle(verticle, new DeploymentOptions());
	}

	@Override
	public void deployVerticle(Verticle verticle, Handler<AsyncResult<String>> completionHandler) {
		Future<String> fut = deployVerticle(verticle);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	@Override
	public Future<String> deployVerticle(Verticle verticle, DeploymentOptions options) {
		if (options.getInstances() != 1) {
			throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle");
		}
		return deployVerticle((Callable<Verticle>) () -> verticle, options);
	}

	@Override
	public void deployVerticle(Verticle verticle, DeploymentOptions options,
			Handler<AsyncResult<String>> completionHandler) {
		Future<String> fut = deployVerticle(verticle, options);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	@Override
	public Future<String> deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options) {
		return deployVerticle((Callable<Verticle>) verticleClass::newInstance, options);
	}

	@Override
	public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options,
			Handler<AsyncResult<String>> completionHandler) {
		Future<String> fut = deployVerticle(verticleClass, options);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	@Override
	public Future<String> deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options) {
		return deployVerticle((Callable<Verticle>) verticleSupplier::get, options);
	}

	@Override
	public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options,
			Handler<AsyncResult<String>> completionHandler) {
		Future<String> fut = deployVerticle(verticleSupplier, options);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	private Future<String> deployVerticle(Callable<Verticle> verticleSupplier, DeploymentOptions options) {
		boolean closed;
		synchronized (this) {
			closed = this.closed;
		}
		if (closed) {
			// If we are closed use a context less future
			return Future.failedFuture("Vert.x closed");
		} else {
			return deploymentManager.deployVerticle(verticleSupplier, options);
		}
	}

	@Override
	public Future<Void> undeploy(String deploymentID) {
		Future<Void> future;
		HAManager haManager = haManager();
		if (haManager != null && haManager.isEnabled()) {
			future = this.executeBlocking(fut -> {
				haManager.removeFromHA(deploymentID);
				fut.complete();
			}, false);
		} else {
			future = getOrCreateContext().succeededFuture();
		}
		return future.compose(v -> deploymentManager.undeployVerticle(deploymentID));
	}

	@Override
	public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
		Future<Void> fut = undeploy(deploymentID);
		if (completionHandler != null) {
			fut.onComplete(completionHandler);
		}
	}

	@Override
	public Set<String> deploymentIDs() {
		return deploymentManager.deployments();
	}

	@Override
	public void registerVerticleFactory(VerticleFactory factory) {
		verticleManager.registerVerticleFactory(factory);
	}

	@Override
	public void unregisterVerticleFactory(VerticleFactory factory) {
		verticleManager.unregisterVerticleFactory(factory);
	}

	@Override
	public Set<VerticleFactory> verticleFactories() {
		return verticleManager.verticleFactories();
	}

	@Override
	public <T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler,
			Handler<AsyncResult<T>> resultHandler) {
		ContextInternal context = getOrCreateContext();

		context.executeBlockingInternal(blockingCodeHandler, resultHandler);
	}

	@Override
	public <T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
			Handler<AsyncResult<T>> resultHandler) {
		ContextInternal context = getOrCreateContext();

		context.executeBlockingInternal(blockingCodeHandler, ordered, resultHandler);
	}

	@Override
	public <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
		ContextInternal context = getOrCreateContext();
		return context.executeBlocking(blockingCodeHandler, ordered);
	}

	@Override
	public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler) {
		return executeBlocking(blockingCodeHandler, true);
	}

	@Override
	public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
			Handler<AsyncResult<T>> asyncResultHandler) {
		ContextInternal context = getOrCreateContext();
		context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
	}

	@Override
	public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
			Handler<AsyncResult<T>> asyncResultHandler) {
		executeBlocking(blockingCodeHandler, true, asyncResultHandler);
	}

	@Override
	public boolean isClustered() {
		return clusterManager != null;
	}

	@Override
	public EventLoopGroup nettyEventLoopGroup() {
		return eventLoopGroup;
	}

	// For testing
	public void simulateKill() {
		if (haManager() != null) {
			haManager().simulateKill();
		}
	}

	@Override
	public Deployment getDeployment(String deploymentID) {
		return deploymentManager.getDeployment(deploymentID);
	}

	@Override
	public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) {
		if (haManager() != null) {
			haManager().setFailoverCompleteHandler(failoverCompleteHandler);
		}
	}

	@Override
	public boolean isKilled() {
		return haManager().isKilled();
	}

	@Override
	public void failDuringFailover(boolean fail) {
		if (haManager() != null) {
			haManager().failDuringFailover(fail);
		}
	}

	@Override
	public VertxMetrics metricsSPI() {
		return metrics;
	}

	@Override
	public File resolveFile(String fileName) {
		return fileResolver.resolveFile(fileName);
	}

	@Override
	public void resolveAddress(String hostname, Handler<AsyncResult<InetAddress>> resultHandler) {
		addressResolver.resolveHostname(hostname, resultHandler);
	}

	@Override
	public AddressResolver addressResolver() {
		return addressResolver;
	}

	@Override
	public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
		return addressResolver.nettyAddressResolverGroup();
	}

	@Override
	public BlockedThreadChecker blockedThreadChecker() {
		return checker;
	}

	private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
		executeBlockingInternal(fut -> {
			try {
				fileResolver.close();
				fut.complete();
			} catch (IOException e) {
				fut.tryFail(e);
			}
		}, ar -> {

			workerPool.close();
			internalBlockingPool.close();
			new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close);

			acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
				@Override
				public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
					if (!future.isSuccess()) {
						log.warn("Failure in shutting down acceptor event loop group", future.cause());
					}
					eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
						@Override
						public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
							if (!future.isSuccess()) {
								log.warn("Failure in shutting down event loop group", future.cause());
							}
							if (metrics != null) {
								metrics.close();
							}
							if (tracer != null) {
								tracer.close();
							}

							checker.close();

							if (completionHandler != null) {
								eventLoopThreadFactory.newThread(() -> {
									completionHandler.handle(Future.succeededFuture());
								}).start();
							}
						}
					});
				}
			});
		});
	}

	public HAManager haManager() {
		return haManager;
	}

	/**
	 * Timers are stored in the {@link #timeouts} map at creation time.
	 * <p/>
	 * Timers are removed from the {@link #timeouts} map when they are cancelled or
	 * are fired. The thread removing the timer successfully owns the timer
	 * termination (i.e cancel or timer) to avoid race conditions between timeout
	 * and cancellation.
	 * <p/>
	 * This class does not rely on the internal {@link #future} for the termination
	 * to handle the worker case since the actual timer {@link #handler} execution
	 * is scheduled when the {@link #future} executes.
	 */
	private class InternalTimerHandler implements Handler<Void>, Closeable, Runnable {

		private final Handler<Long> handler;
		private final boolean periodic;
		private final long timerID;
		private final ContextInternal context;
		private final java.util.concurrent.Future<?> future;

		InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay,
				ContextInternal context) {
			this.context = context;
			this.timerID = timerID;
			this.handler = runnable;
			this.periodic = periodic;
			EventLoop el = context.nettyEventLoop();
			if (periodic) {
				future = el.scheduleAtFixedRate(this, delay, delay, TimeUnit.MILLISECONDS);
			} else {
				future = el.schedule(this, delay, TimeUnit.MILLISECONDS);
			}
		}

		@Override
		public void run() {
			context.emit(this);
		}

		public void handle(Void v) {
			if (periodic) {
				if (timeouts.containsKey(timerID)) {
					handler.handle(timerID);
				}
			} else if (timeouts.remove(timerID) != null) {
				try {
					handler.handle(timerID);
				} finally {
					// Clean up after it's fired
					context.removeCloseHook(this);
				}
			}
		}

		private void cancel() {
			future.cancel(false);
			if (context.isDeployment()) {
				context.removeCloseHook(this);
			}
		}

		// Called via Context close hook when Verticle is undeployed
		public void close(Promise<Void> completion) {
			if (timeouts.remove(timerID) != null) {
				future.cancel(false);
			}
			completion.complete();
		}
	}

	/*
	 *
	 * This class is optimised for performance when used on the same event loop that
	 * is was passed to the handler with. However it can be used safely from other
	 * threads.
	 *
	 * The internal state is protected using the synchronized keyword. If always
	 * used on the same event loop, then we benefit from biased locking which makes
	 * the overhead of synchronized near zero.
	 *
	 */
	private class TimeoutStreamImpl implements TimeoutStream, Handler<Long> {

		private final long delay;
		private final boolean periodic;

		private Long id;
		private Handler<Long> handler;
		private Handler<Void> endHandler;
		private long demand;

		public TimeoutStreamImpl(long delay, boolean periodic) {
			this.delay = delay;
			this.periodic = periodic;
			this.demand = Long.MAX_VALUE;
		}

		@Override
		public synchronized void handle(Long event) {
			try {
				if (demand > 0) {
					demand--;
					handler.handle(event);
				}
			} finally {
				if (!periodic && endHandler != null) {
					endHandler.handle(null);
				}
			}
		}

		@Override
		public synchronized TimeoutStream fetch(long amount) {
			demand += amount;
			if (demand < 0) {
				demand = Long.MAX_VALUE;
			}
			return this;
		}

		@Override
		public TimeoutStream exceptionHandler(Handler<Throwable> handler) {
			return this;
		}

		@Override
		public void cancel() {
			if (id != null) {
				VertxImpl.this.cancelTimer(id);
			}
		}

		@Override
		public synchronized TimeoutStream handler(Handler<Long> handler) {
			if (handler != null) {
				if (id != null) {
					throw new IllegalStateException();
				}
				this.handler = handler;
				id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
			} else {
				cancel();
			}
			return this;
		}

		@Override
		public synchronized TimeoutStream pause() {
			demand = 0;
			return this;
		}

		@Override
		public synchronized TimeoutStream resume() {
			demand = Long.MAX_VALUE;
			return this;
		}

		@Override
		public synchronized TimeoutStream endHandler(Handler<Void> endHandler) {
			this.endHandler = endHandler;
			return this;
		}
	}

	class SharedWorkerPool extends WorkerPool {

		private final String name;
		private int refCount = 1;

		SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) {
			super(workerExec, workerMetrics);
			this.name = name;
		}

		@Override
		void close() {
			synchronized (VertxImpl.this) {
				if (refCount > 0) {
					refCount = 0;
					super.close();
				}
			}
		}

		void release() {
			synchronized (VertxImpl.this) {
				if (--refCount == 0) {
					namedWorkerPools.remove(name);
					super.close();
				}
			}
		}
	}

	@Override
	public WorkerExecutorImpl createSharedWorkerExecutor(String name) {
		return createSharedWorkerExecutor(name, defaultWorkerPoolSize);
	}

	@Override
	public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) {
		return createSharedWorkerExecutor(name, poolSize, maxWorkerExecTime);
	}

	@Override
	public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
		return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, maxWorkerExecTimeUnit);
	}

	@Override
	public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime,
			TimeUnit maxExecuteTimeUnit) {
		SharedWorkerPool sharedWorkerPool = createSharedWorkerPool(name, poolSize, maxExecuteTime, maxExecuteTimeUnit);
		AbstractContext ctx = getContext();
		CloseHooks hooks = ctx != null ? ctx.closeHooks() : null;
		if (hooks == null) {
			hooks = closeHooks;
		}
		WorkerExecutorImpl namedExec = new WorkerExecutorImpl(this, closeHooks, sharedWorkerPool);
		hooks.add(namedExec);
		return namedExec;
	}

	public synchronized SharedWorkerPool createSharedWorkerPool(String name, int poolSize, long maxExecuteTime,
			TimeUnit maxExecuteTimeUnit) {
		if (poolSize < 1) {
			throw new IllegalArgumentException("poolSize must be > 0");
		}
		if (maxExecuteTime < 1) {
			throw new IllegalArgumentException("maxExecuteTime must be > 0");
		}
		SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
		if (sharedWorkerPool == null) {
			ExecutorService workerExec = Executors.newFixedThreadPool(poolSize,
					new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit));
			PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null;
			namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
		} else {
			sharedWorkerPool.refCount++;
		}
		return sharedWorkerPool;
	}

	@Override
	public Vertx exceptionHandler(Handler<Throwable> handler) {
		exceptionHandler = handler;
		return this;
	}

	@Override
	public Handler<Throwable> exceptionHandler() {
		return exceptionHandler;
	}

	@Override
	public void addCloseHook(Closeable hook) {
		closeHooks.add(hook);
	}

	@Override
	public void removeCloseHook(Closeable hook) {
		closeHooks.remove(hook);
	}

	private CloseHooks resolveHooks() {
		AbstractContext context = getContext();
		CloseHooks hooks = context != null ? context.closeHooks() : null;
		if (hooks == null) {
			hooks = closeHooks;
		}
		return hooks;
	}
}
